【Informatica Data Ingestion】タスクフローから取り込みタスクの設定値を上書きして、ファイル取り込みの並列実行ができるか試してみた
はじめに
こんにちは、データ事業本部の渡部です。
今回は1つのData Ingestion(旧称Mass Ingestion)のファイル取り込みタスクを使い回して、複数のタスクフローから引数を渡すことで取り込み処理が実現するのか試してみます。
もし上記が実現するならば、同じような処理のファイル取り込みタスクは1つだけの作成で済み、アセット管理やテストの簡略化が可能です。
ファイル取り込みタスクは以下で作成したものを使用します。検証のため、「ソース」で設定した「ファイルピックアップ後の操作」はファイルを保持
としています。
試してみた
使用するファイル取り込みタスクは、ローカルフォルダからS3の取り込みであり、ターゲット(S3)のフォルダパスは./sessionlog/${system.year}/${system.month}/${system.day}
と設定されています。
まずはこちらのフォルダパスをタスクフローから渡せるか試します。
タスクフローでは取り込みタスクステップを使用して、上述のファイル取り込みタスクを設定します。
取り込みタスクの「入力フィールド」の「+」を押下して、「Target」>「フォルダパス」を選択します。
フォルダパスの設定値は、元の取り込みタスクの設定値の日付部分を以下のようにtest1
と変更します。
変更前:./sessionlog/${system.year}/${system.month}/${system.day}
変更後:./sessionlog/${system.year}/${system.month}/test1
タスクフローでData Ingestionのシステム変数が使用できるかどうかは疑問です。
タスクフローを保存して実行してみると、ジョブが成功しました。
S3には想定どおりにファイルが取り込まれています。
このことから以下がわかりました。
- タスクフローからData Ingestionの値の上書きが可能
- 上書きする値はData Ingestionのシステム変数も使用可能
続いてタスクフローの同時実行を試します。
同じファイル取り込みタスクを同時に実行するので、何かしらの問題が発生するかもしれません。
ということで、検証用に同じファイル取り込みタスクを呼び出すタスクフローを2つ作成しました。
ターゲットのフォルダパスのプレフィクスはtest2
・test3
とそれぞれ設定しています。
早速タスクフローを実行してみると・・
後に実行したタスクフローが一時停止となってしまいました。
取り込みタスクのエラーメッセージはYou cannot run the task because it is currently running.
とのことです。
「タスクが動いてるから実行できないよ〜」と言われてしまいました。
「同時実行はできないか・・」と思ってしまう状況ですが、以前ファイル取り込みタスクに「並列処理を許可」の設定があることを覚えていました。
そのためこちらの設定をオンにし、2並列で実行できるか試すため「並行バッチ」を2としました。
また一方で、成功していると見えたもう一つのジョブも実はファイル取り込みができていませんでした。
こちらは「重複するファイルをスキップ」をオンにしていたことが原因だったため、オフとしました。
改修後、2つのタスクフローを同時に実行すると今度は2つとも成功しました。
S3には想定どおりのフォルダパスに、ファイルが取り込まれていることが確認できました。
さいごに
今回の結論としては、以下となります。
- タスクフローからData Ingestionで使用する設定値の上書きが可能
- 上書きする値はData Ingestionのシステム変数も使用可能
- Data Ingestionの並列処理を許可すれば、1つのタスクを複数のタスクフローから並列実行が可能
ちなみに今回の検証では複数タスクフローを作成して設定値を上書きしましたが、
設定値をタスクフローの入力フィールドから渡すようにして、タスクフローを実行するAPIの引数として渡すことも可能かと思います。
そうすることで要件さえ合えば、 Data Ingestionとタスクフローの開発は1つずつで済ませることができるかもしれません 。
またAPI以外にはタスクフローではパラメータセットの使用もできるので、CDIのパラメータファイルと同じような方法での実装も可能なのではないかと想像しております。
このあたりは要検証です。
なお並列実行についてはこちらのドキュメントで言及されていて、「fmi-task-max-pool-size」の数が最大で確保できる並列数とのことです。
最大の並列数に近づくとパフォーマンス悪化するようになるので注意が必要です。
ちょっと追加情報
以下ドキュメントでユーザー定義変数について言及されています。
ユーザー定義変数で何ができるのかというと、Data Ingestionの設定値を任意の変数で設定しておいて、Data IngestionをAPIで実行する際に、引数として変数値を渡して動的に実行が可能です。
今回のタスクフローでもAPIとして呼び出せるようにすれば、同じように処理を組むこともできますが、タスクフローを必要としない選択肢として覚えておきたいところです。